package org.zjvis.datascience.service.dag;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.*;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Transactional;
import org.zjvis.datascience.common.algopy.ImputationUtil;
import org.zjvis.datascience.common.constant.Constant;
import org.zjvis.datascience.common.dto.PipelineInstanceDTO;
import org.zjvis.datascience.common.dto.TaskDTO;
import org.zjvis.datascience.common.dto.TaskInstanceDTO;
import org.zjvis.datascience.common.enums.*;
import org.zjvis.datascience.common.etl.FilterHelper;
import org.zjvis.datascience.common.exception.DataScienceException;
import org.zjvis.datascience.common.model.Transpose;
import org.zjvis.datascience.common.util.*;
import org.zjvis.datascience.common.vo.TaskVO;
import org.zjvis.datascience.service.*;
import org.zjvis.datascience.service.dataprovider.GPDataProvider;
import org.zjvis.datascience.service.graph.GraphInstanceService;
import org.zjvis.datascience.service.graph.GraphService;
import org.zjvis.datascience.service.graph.JanusGraphEmbedService;

import static org.zjvis.datascience.common.constant.DataJsonConstant.LAST_TIMESTAMP_HEADER;
import static org.zjvis.datascience.common.constant.DataJsonConstant.OUTPUT_HEADER;

/**
 * @description DAG任务调度器
 * @date 2021-12-24
 */
@Service
public class DAGScheduler {

    @Autowired
    private TaskInstanceService taskInstanceService;

    @Autowired
    private PipelineInstanceService pipelineInstanceService;

    @Lazy
    @Autowired
    private TaskService taskService;

    @Autowired
    private GPDataProvider gpDataProvider;

    @Autowired
    private RestTemplateUtil restTemplateUtil;

    @Autowired
    private EngineSelector engineSelector;

    @Autowired
    private TColumnService tColumnService;

    @Autowired
    private GraphService graphService;

    @Autowired
    private MLModelService mlModelService;

    @Autowired
    private JlabService jlabService;

    @Autowired
    private GraphInstanceService graphInstanceService;

    @Autowired
    private JanusGraphEmbedService janusGraphEmbedService;

    @Autowired
    private OperatorTemplateService operatorTemplateService;

    @Autowired
    private TaskManager taskManager;

    private final static Logger logger = LoggerFactory.getLogger("schedule");

    private ExecutorService executor = Executors.newFixedThreadPool(100);

    private ThreadPoolExecutor tpe = ((ThreadPoolExecutor) executor);

    //applicationId和taskInstance的映射
    private Map<String, TaskInstanceDTO> appIdInstanceMap = Maps.newConcurrentMap();

    private Map<Long, String> needStopTaskMap = Maps.newConcurrentMap();

    private Map<Long, Integer> retryStopTaskMap = Maps.newConcurrentMap();

    private boolean isAncestorForbidden(Set<Long> forbiddenTasks, Map<Long, String> taskRelationMap,
                                        Long currentId) {
        boolean isForbidden = false;
        String parentIds = taskRelationMap.get(currentId);
        if (StringUtils.isNotEmpty(parentIds)) {
            String[] ids = parentIds.split(",");
            for (String id : ids) {
                if (forbiddenTasks.contains(Long.parseLong(id))) {
                    isForbidden = true;
                    break;
                } else {
                    isForbidden = isAncestorForbidden(forbiddenTasks, taskRelationMap,
                            Long.parseLong(id));
                    if (isForbidden) {
                        break;
                    }
                }
            }
        }
        return isForbidden;
    }

    public Boolean isStillRunning(Long taskId) {
        for (Long sessionId : taskManager.getTaskHolder().keySet()) {
            if (taskManager.isStillRunning(sessionId, taskId)) {
                return Boolean.TRUE;
            }
        }
        return Boolean.FALSE;
    }

    /**
     * 查看是否当前节点的父辈节点存在上次失败情况，如果是的话需要重跑
     *
     * @param lastFailTasks
     * @param taskRelationMap
     * @param currentId
     * @return
     */
    private boolean isAncestorLastTimeFail(Set<Long> lastFailTasks,
                                           Map<Long, String> taskRelationMap,
                                           Long currentId) {
        boolean isFail = false;
        if (lastFailTasks.size() == 0) {
            return false;
        }
        if (lastFailTasks.contains(currentId)) {
            return true;
        }
        String parentIds = taskRelationMap.get(currentId);
        if (StringUtils.isNotEmpty(parentIds)) {
            String[] ids = parentIds.split(",");
            for (String id : ids) {
                if (lastFailTasks.contains(Long.parseLong(id))) {
                    isFail = true;
                    break;
                } else {
                    isFail = isAncestorLastTimeFail(lastFailTasks, taskRelationMap,
                            Long.parseLong(id));
                    if (isFail) {
                        break;
                    }
                }
            }
        }
        return isFail;
    }

    private boolean isLinked(TaskDTO task) {
        Integer type = task.getType();
        if (type == null) {
            return true;
        }
        boolean isLinked = true;
        if (type.equals(TaskTypeEnum.TASK_TYPE_ALGO.getVal())
                || type.equals(TaskTypeEnum.TASK_TYPE_ETL.getVal())
                || type.equals(TaskTypeEnum.TASK_TYPE_CLEAN.getVal())) {
            String parents = task.getParentId();
            if (StringUtils.isEmpty(parents)) {
                isLinked = false;
            }
        }
        return isLinked;
    }

    /**
     * 获取当前修改节点的子孙节点, 包括本身
     *
     * @param taskMap
     * @param currentTaskId
     * @return
     */
    private Map<Long, TaskDTO> getDescendants(Map<Long, TaskDTO> taskMap, Long currentTaskId) {
        Map<Long, TaskDTO> resultMap = new HashMap<>();
        Queue<Long> queue = new ArrayDeque<>();
        queue.offer(currentTaskId);
        while (!queue.isEmpty()) {
            Long current = queue.poll();
            TaskDTO taskDTO = taskMap.get(current);
            if (taskDTO == null) {
                continue;
            }
            resultMap.put(current, taskDTO);
            String childId = taskDTO.getChildId();
            if (StringUtils.isNotEmpty(childId)) {
                String[] ids = childId.split(",");
                for (String id : ids) {
                    queue.offer(Long.parseLong(id));
                }
            }
        }
        return resultMap;
    }

    private Map<Long, TaskDTO> getAncestors(Map<Long, TaskDTO> taskMap, Long currentTaskId) {
        Map<Long, TaskDTO> resultMap = new HashMap<>();
        Queue<Long> queue = new ArrayDeque<>();
        queue.offer(currentTaskId);
        while (!queue.isEmpty()) {
            Long current = queue.poll();
            TaskDTO taskDTO = taskMap.get(current);
            if (taskDTO == null) {
                continue;
            }
            resultMap.put(current, taskDTO);
            String parentId = taskDTO.getParentId();
            if (StringUtils.isNotEmpty(parentId)) {
                String[] ids = parentId.split(",");
                for (String id : ids) {
                    queue.offer(Long.parseLong(id));
                }
            }
        }
        return resultMap;
    }

    private List<Set<Long>> getAncestorsWithLevel(Long currentTaskId) {
        List<Set<Long>> resultMap = new ArrayList<>();
        Queue<Long> queue = new ArrayDeque<>();
        queue.offer(currentTaskId);

        while (!queue.isEmpty()) {
            Long current = queue.poll();
            TaskDTO taskDTO = taskService.queryById(current);
            if (taskDTO == null) {
                continue;
            }
            Set<Long> tempSet = new HashSet<>();
            String parentId = taskDTO.getParentId();
            if (StringUtils.isNotEmpty(parentId)) {
                String[] ids = parentId.split(",");
                for (String id : ids) {
                    queue.offer(Long.parseLong(id));
                    tempSet.add(Long.parseLong(id));
                }
            } else {
                tempSet.add(current);
            }
            if (tempSet.size() != 0) {
                resultMap.add(tempSet);
            }
        }
        Set<Long> finishSet = new HashSet<>();
        finishSet.add(0L);
        resultMap.add(finishSet);
        return CollectionUtil.reverse(resultMap);
    }

    private Map<Long, TaskDTO> mergeAncestorAndDescendant(Map<Long, TaskDTO> ancestor,
                                                          Map<Long, TaskDTO> descendant) {
        Map<Long, TaskDTO> resultMap = new HashMap<>();
        resultMap.putAll(ancestor);
        resultMap.putAll(descendant);
        return resultMap;
    }

    private String getLastStatusForTask(TaskDTO taskDTO) {
        TaskVO vo = taskDTO.view();
        JSONObject jsonObject = vo.getData();
        if (jsonObject == null || !jsonObject.containsKey("lastStatus")) {
            return "";
        }
        return jsonObject.getString("lastStatus");
    }

    private Long getLastTimeStamp(TaskDTO taskDTO) {
        TaskVO vo = taskDTO.view();
        JSONObject jsonObject = vo.getData();
        if (jsonObject == null || !jsonObject.containsKey("lastTimeStamp")) {
            return 0L;
        }
        return jsonObject.getLong("lastTimeStamp");
    }

    public void setLastTimeStamp(TaskDTO taskDTO, Long timestamp) {
        TaskVO vo = taskDTO.view();
        JSONObject jsonObject = vo.getData();
        jsonObject.put("lastTimeStamp", timestamp);
        taskDTO.setDataJson(JSONObject.toJSONString(jsonObject));
    }

    private void setLastStatus(TaskDTO taskDTO, String status) {
        TaskVO vo = taskDTO.view();
        JSONObject jsonObject = vo.getData();
        jsonObject.put("lastStatus", status);
        taskDTO.setDataJson(JSONObject.toJSONString(jsonObject));
    }

    private String modifySql(TaskDTO taskDTO, String sql, Long timeStamp, Map<Long, TaskDTO> needExecTasks) {
        if (StringUtils.isEmpty(sql)) {
            return sql;
        }
        String newSql = sql;
        String parents = taskDTO.getParentId();
        if (StringUtils.isEmpty(parents)) {
            return newSql;
        }
        String[] ids = parents.split(",");
        for (String id : ids) {
            if (needExecTasks != null && needExecTasks.containsKey(Long.parseLong(id))) {
                logger.warn("task={} need to re-run", id);
                continue;
            }
            TaskDTO dto = taskService.queryById(Long.parseLong(id));
            Long lastTimeStamp = this.getLastTimeStamp(dto);
            if (timeStamp == 0L) {
                continue;
            }
            String pattern = String.format("_%s_%s_%s", dto.getPipelineId(), id, timeStamp);
            String replacePattern = String.format("_%s_%s_%s", dto.getPipelineId(), id, lastTimeStamp);
            newSql = newSql.replaceAll(pattern, replacePattern);
        }
        return newSql;
    }

    /**
     * 针对异常结束的task中isSample状态位清空
     */
    private void clearSampleFlag(Long pipelineId) {
        List<TaskDTO> tasks = taskService.queryByPipeline(pipelineId);
        for (TaskDTO task : tasks) {
            JSONObject data = task.view().getData();
            if (data.containsKey("isSample") &&
                    (data.getString("isSample").equals("CREATE") || data.getString("isSample").equals("RUNNING"))
            ) {
                data.remove("isSample");
                task.setDataJson(data.toJSONString());
                taskService.update(task);
            }
        }
    }

    /**
     * 触发采样session
     *
     * @param pipelineId
     * @param currentTaskId
     * @return
     */
    public Long triggerSampling(Long pipelineId, Long currentTaskId) {
        /*
         * 项目可通过app.sampling设置是否需要采样 也可以通过 HTTP请求传参指定本次是否进行采样
         * e.g.
         * {
         *      "id":3111,
         *      "taskId":32321,
         *      "projectId":2328,
         *      "sampling":false
         *  }
         *
         * 因为采样全量阶段的不同，对应使用的计算引擎也不同，相应的 task节点信息内容可能也不一样
         * e.g. 通用算子节点Kmeans在采样阶段sqlText属性 是一段sql语句
         * 在全量阶段 sqlText属性是 一行命令 用于执行存放在服务器上的算子jar
         *
         * 前后端如何区分采样全量
         * 采样阶段 前端通过查找 task.datajson属性中 isSample属性 CREATE -> RUNNING
         * 全量阶段 前端通过查找 task.datajson属性中 isSample属性 SUCCESS || FAIL
         */
        this.clearSampleFlag(pipelineId);
        Long sessionId = this.trigger(pipelineId, currentTaskId);
        taskManager.addSamplingSession(sessionId);
        return sessionId;
    }

    /**
     * 触发全量执行
     *
     * @param pipelineId
     * @param currentTaskId
     * @param restFlag      针对可视化构建视图，通过指定flag 决定要不要 强制更新sampling的状态位
     * @return
     */
    public Long triggerFullDose(Long pipelineId, Long currentTaskId, boolean restFlag) {
        if (restFlag && currentTaskId != 0) {
            /*
            此处的状态判断是为了在可视化构建视图中 configuration类型widget的正常触发执行
            也是为了模仿 正常节点在数据视图的执行操作。
            一般节点，全量执行时，起始状态应该为isSample:CREATE

            如果restFlag为True, 说明前端widget触发执行 选择了只执行全量阶段。
             */
            this.resetSampleFlag(currentTaskId);
        }
        return this.trigger(pipelineId, currentTaskId);
    }

    /**
     * 重置单个task节点 标志位状态，走全量执行
     * @param targetTaskId
     */
    private void resetSampleFlag(Long targetTaskId) {
        TaskDTO task = taskService.queryById(targetTaskId);
        JSONObject data = task.view().getData();
        data.put("isSample", "CREATE");
        task.setDataJson(data.toJSONString());
        taskService.update(task);
    }


    private Long trigger(Long pipelineId, Long currentTaskId) {
        if (currentTaskId == 0) {
            //可能需要对SQL 进行处理，想taskManager 做一些设置
            //TODO  NAP-2207  有数据泄露风险
        }
        return trigger(pipelineId, currentTaskId, false);
    }

    /**
     * @param pipelineId
     * @param currentTaskId 当前需要执行的节点的id，新调度机制下需要从该节点开始调度
     * @return
     */
    @Transactional(isolation = Isolation.SERIALIZABLE)
    public Long trigger(Long pipelineId, Long currentTaskId, boolean isGraph) {

        PipelineInstanceDTO pipelineInstanceDTO = pipelineInstanceService.createInstance(pipelineId);
        Long sessionId = pipelineInstanceService.save(pipelineInstanceDTO);

        List<TaskDTO> tasks = taskService.queryByPipeline(pipelineId);

        Map<Long, TaskDTO> taskMap = Maps.newHashMap();
        Map<Long, String> taskRelationMap = Maps.newHashMap();

        Set<Long> forbiddenTasks = Sets.newHashSet();
        Set<Long> lastFailTasks = Sets.newHashSet();

        long timeStamp = System.currentTimeMillis();

        for (TaskDTO task : tasks) {
            taskMap.put(task.getId(), task);
            String parentId = task.getParentId();
            if (StringUtils.isNotEmpty(parentId)) {
                taskRelationMap.put(task.getId(), parentId);
            }
            if (task.isForbidden()) {
                forbiddenTasks.add(task.getId());
            }
            String lastStatus = this.getLastStatusForTask(task);
            if (StringUtils.isNotEmpty(lastStatus) && !lastStatus.equals(TaskInstanceStatus.SUCCESS.toString())) {
                lastFailTasks.add(task.getId());
            } else if (StringUtils.isEmpty(lastStatus) && !TaskUtil.isDataNode(task)) {
                // 节点首次添加
                lastFailTasks.add(task.getId());
            }
        }

        Map<Long, TaskDTO> childrenMap = getDescendants(taskMap, currentTaskId);

        Map<Long, TaskDTO> parentsMap = getAncestors(taskMap, currentTaskId);

        Map<Long, TaskDTO> childAndParentMap = mergeAncestorAndDescendant(parentsMap, childrenMap);

        Map<Long, TaskFuture> instanceMap = Maps.newHashMap();
        // taskId, instance
        Map<Long, TaskInstanceDTO> instanceDTOMap = Maps.newHashMap();

        Map<Long, TaskDTO> needExecTasks = Maps.newHashMap();

        // 获取需要本次执行的task节点
        for (TaskDTO task : tasks) {
            if (!isLinked(task)) {
                logger.warn("alg task or etl task not linked!!!");
                continue;
            }
            if (forbiddenTasks.contains(task.getId())) {
                logger.warn("task={} is forbidden!!!", task.getId());
                continue;
            }
            if (isAncestorForbidden(forbiddenTasks, taskRelationMap, task.getId())) {
                logger.warn("task={} has ancestor node forbidden!!", task.getId());
                continue;
            }
            if (childAndParentMap.size() != 0 && !childAndParentMap.containsKey(task.getId())) {
                // 节点不在连通图内，不执行
                logger.warn("task={} not in dag", task.getId());
                continue;
            }
            if ((childrenMap.size() != 0 && !childrenMap.containsKey(task.getId())) &&
                    (!isAncestorLastTimeFail(lastFailTasks, taskRelationMap, task.getId()))) {
                // 节点非currentTaskId的子孙节点，同时也没有父辈节点上次执行失败
                logger.warn("task={} not a child of currentTaskId and no ancestor was failed last time", task.getId());
                continue;
            }

            needExecTasks.put(task.getId(), task);
            TaskInstanceDTO instance = new TaskInstanceDTO(task, sessionId);
            instance.setStatus(TaskInstanceStatus.RUNNING.toString());

            taskInstanceService.save(instance);
            logger.warn(String.format("Instance(%s) Create, %s", instance.getId(), instance));

            //数据节点直接设置成功。开始执行的节点是清洗节点直接设置成功
            if (TaskUtil.isDataNode(task) ||
                    (task.getId().equals(currentTaskId) && task.getType().equals(TaskTypeEnum.TASK_TYPE_CLEAN.getVal()))
            ) {
                instance.setStatus(TaskInstanceStatus.SUCCESS.toString());
                instance.setProgress(100);
            } else if (task.getType().equals(TaskTypeEnum.TASK_TYPE_GRAPH.getVal()) && !isGraph) {
                instance.setStatus(TaskInstanceStatus.SUCCESS.toString());
                instance.setProgress(100);
            } else if (task.getType().equals(TaskTypeEnum.TASK_TYPE_MODEL.getVal())) {
                //TODO AY
                instance.setStatus(TaskInstanceStatus.RUNNING.toString());
                //TaskDTO task = taskService.queryById(taskId);
                Long taskId = task.getId();
                JSONObject dataJson = JSONObject.parseObject(task.getDataJson());
                JSONArray outputs = dataJson.getJSONArray("output");
                JSONObject output = outputs.getJSONObject(0);
                Long modelId = dataJson.getLong("modelId");
                String target = "pipeline." + "ml_" + task.getUserId() + "_";
                long timeStampModel = System.currentTimeMillis();
                target += task.getId();
                this.setLastTimeStamp(task, timeStampModel);
                taskService.matchParam(dataJson, task);
                taskService.update(task);
                String featureX = dataJson.getString("feature_X");
                JSONArray inputs = dataJson.getJSONArray("input");
                JSONObject input = inputs.getJSONObject(0);
                String source = input.getString("tableName");
                Character lastChar = source.charAt(source.length() - 1);
                if (lastChar.equals('_')) {
                    Long parentTimeStamp = dataJson.getJSONArray("parentTimeStamps").getLong(0);
                    source = source + parentTimeStamp;
                }
                //String appArgs = params.getString("param");
                Long sparktime = mlModelService.queryMetricsById(modelId).getSparktime();
                if (null == sparktime || sparktime.equals(0L)) {
                    //FLASK
                    try {
                        JSONObject params = new JSONObject();
                        params.put("apiPath", "ml_model");
                        params.put("model_id", modelId);
                        params.put("execution", "predict");
                        params.put("target", target);
                        params.put("source", source);
                        params.put("feature_X", featureX);
                        params.put("taskId", taskId);
                        String res = mlModelService.submitFlaskJob(params);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                } else {
                    //SPARK
                    String appArgs = "--id " + modelId + " --target " + target + " --exe predict"
                            + " --source " + source + " --feature_col " + featureX
                            + " --task_id " + taskId + " --instance_id " + 100;
                    mlModelService.exec(appArgs);
                }


                JSONObject newParam = dataJson.getJSONObject("param");
                newParam.put("execution", "predict");
                newParam.put("feature_X", featureX);
                newParam.put("source", output.getString("tableName"));
                newParam.put("model_saved_path", dataJson.getString("modelPath"));
                newParam.put("model_id", dataJson.getLong("modelId"));
                newParam.put("target", target);
                instance.setStatus(TaskInstanceStatus.SUCCESS.toString());
                instance.setProgress(100);
                instance.setDataJson(newParam.toJSONString());
            } else {
                String sql = "";
                int algType = task.view().getData().getIntValue("algType");
                if (task.getType().equals(TaskTypeEnum.TASK_TYPE_ETL.getVal()) &&
                        (ETLEnum.JOIN.getVal() == algType || ETLEnum.UNION.getVal() == algType)) {
                    // 如果是join, union算子，因为需要支持autojoin, 所以需要延迟加载sql, 不需要在这里设置
                } else {
                    sql = taskService.initSql(task, timeStamp);
                }
                instance.setSqlText(sql);
                instance.setStatus(TaskInstanceStatus.CREATE.toString());
            }
            taskInstanceService.update(instance);
            instanceDTOMap.put(instance.getTaskId(), instance);
        }

        logger.error("going to execute task list -> {}", instanceDTOMap.keySet().toArray());

        // 对sql进行微调
        for (Map.Entry<Long, TaskInstanceDTO> entry : instanceDTOMap.entrySet()) {
            Long taskId = entry.getKey();
            TaskInstanceDTO taskInstanceDTO = entry.getValue();
            String sql = taskInstanceDTO.getSqlText();
            String newSql = this.modifySql(needExecTasks.get(taskId), sql, timeStamp, needExecTasks);
            if (StringUtils.isNotEmpty(sql) && !sql.equals(newSql)) {
                taskInstanceDTO.setSqlText(newSql);
                taskInstanceService.update(taskInstanceDTO);
            }
            instanceMap.put(taskInstanceDTO.getTaskId(), new TaskFuture(taskInstanceDTO, null));
        }
        Map<Long, TaskInstanceDTO> tmpInstanceMap = new HashMap<>();
        //Construct taskInstance relationship
        for (TaskFuture taskFuture : instanceMap.values()) {
            List<Long> parents = Lists.newArrayList();
            TaskInstanceDTO instance = taskFuture.getInstance();
            String parentTaskId = taskRelationMap.get(instance.getTaskId());
            if (StringUtils.isNotEmpty(parentTaskId)) {

                String[] spliter = StringUtils.split(parentTaskId, ",");
                for (String singleParentStr : spliter) {
                    TaskDTO task = taskMap.get(Long.parseLong(singleParentStr));
                    if (task != null) {
                        TaskFuture parentTaskFuture = instanceMap.get(task.getId());
                        if (parentTaskFuture != null) {
                            parents.add(parentTaskFuture.getInstance().getId());
                        } else {
                            TaskInstanceDTO dto = taskInstanceService.queryLatestInstanceForTask(task.getId());
                            if (dto != null) {
                                parents.add(dto.getId());
                            }
                        }
                    }
                }
            }
            if (CollectionUtils.isNotEmpty(parents)) {
                instance.setParentId(Joiner.on(",").join(parents));
                taskInstanceService.update(instance);
            }
            tmpInstanceMap.put(instance.getId(), instance);
        }
        taskManager.putDagTaskMap(sessionId, tmpInstanceMap);
        taskManager.putTaskHolder(sessionId, new ArrayList<TaskFuture>(instanceMap.values()));
        taskManager.putTimeHolder(sessionId, timeStamp);
        return sessionId;
    }

    //TODO  RM -AY
//    private boolean updateOutputForTask(TaskDTO task, TaskInstanceDTO instanceDTO) {
//        String logInfo = instanceDTO.getLogInfo();
//        JSONObject result;
//        try {
//            result = JSONObject.parseObject(logInfo);
//            JSONArray outputs = result.getJSONObject("result").getJSONArray("output_params");
//            JSONObject data = JSONObject.parseObject(task.getDataJson());
//            JSONArray output = new JSONArray();
//            for (int i = 0; i < outputs.size(); ++i) {
//                JSONObject item = outputs.getJSONObject(i);
//                String tableName = item.getString("out_table_name");
//                JSONArray tableCols = item.getJSONArray("output_cols");
//                JSONArray columnTypes = item.getJSONArray("col_types");
//                JSONObject tableDesc = new JSONObject();
//                JSONObject semantic = new JSONObject();
//                for (String col: tableCols.toJavaList(String.class)){
//                    semantic.put(col, "null");
//                }
//                tableDesc.put("tableName", tableName);
//                tableDesc.put("semantic", semantic);
//                tableDesc.put("tableCols", tableCols);
//                tableDesc.put("columnTypes", columnTypes);
//                output.add(tableDesc);
//            }
//            data.put("output", output);
//            task.setDataJson(data.toJSONString());
//        } catch (Exception e) {
//            return false;
//        }
//        return true;
//    }


    @Scheduled(cron = "0/1 * *  * * ?")
    public void schedule() {
        for (Long sessionId : taskManager.getTaskHolder().keySet()) {
            run(sessionId);
        }
        check();
        cleanup();
    }

    /**
     * 清除和撤销已提交的  但是需要删除的任务
     */
    private void cleanup() {
        if (needStopTaskMap.size() > 0) {
            for (Map.Entry<Long, String> entry : needStopTaskMap.entrySet()) {
                TaskInstanceDTO instance = taskInstanceService.queryById(entry.getKey());
                String applicationId = instance.getApplicationId();
                //耐心值N
                if (retryStopTaskMap.get(instance.getId()) > 15) {
                    logger.info("[CLEAN UP] taskInstanceId {} failed, please stop it manually", instance.getId());
                    needStopTaskMap.remove(entry.getKey());
                    continue;
                }
                if (null == applicationId || applicationId.isEmpty()) {
                    retryStopTaskMap.put(instance.getId(), retryStopTaskMap.get(instance.getId()) + 1);
                    continue;
                } else {
                    if (restTemplateUtil.killJob(applicationId)) {
                        logger.info("[CLEAN UP] job {} success, has removed from holder...", applicationId);
                        needStopTaskMap.remove(entry.getKey());
                    } else {
                        logger.info("[CLEAN UP] job {} failed, plan to stop in next round", applicationId);
                        retryStopTaskMap.put(instance.getId(), retryStopTaskMap.get(instance.getId()) + 1);
                    }
                }
            }
        } else {
            if (retryStopTaskMap.size() > 0) {
                retryStopTaskMap.clear();
            }
        }
    }


    /**
     * 检查正在执行的任务
     */
    private void check() {
        for (Long sessionId : taskManager.getTaskHolder().keySet()) {
            String status = taskInstanceService.queryStatusBySessionId(sessionId);
            if (TaskInstanceStatus.SUCCESS.toString().equals(status)) {
                pipelineInstanceService.updateStatus(sessionId, TaskInstanceStatus.SUCCESS.toString());
                taskManager.remove(sessionId);
                logger.warn("session=" + sessionId + " execute success, remove from holder...");
            } else if ((TaskInstanceStatus.KILLED.toString().equals(status) ||
                    TaskInstanceStatus.FAIL.toString().equals(status) ||
                    TaskInstanceStatus.STOP.toString().equals(status)
            ) &&
                    !taskManager.isRunning(sessionId)) {
                pipelineInstanceService.updateStatus(sessionId, status);
                taskManager.remove(sessionId);
                logger.warn("session=" + sessionId + " execute " + status + ", remove from holder...");
            } else {
                logger.warn("session={} still running", sessionId);
            }
        }
    }


    @Transactional
    public void run(Long sessionId) {
        //先别删，高并发压测时候 用这个看看线程的情况
        int queueSize = tpe.getQueue().size();
        int activeCount = tpe.getActiveCount();
        long completedTaskCount = tpe.getCompletedTaskCount();
        long taskCount = tpe.getTaskCount();
        logger.info("当前排队线程数：" + queueSize + "当前活动线程数：" + activeCount + "执行完成线程数：" + completedTaskCount + "总线程数：" + taskCount);

        List<TaskFuture> taskFutures = taskManager.getTaskHolder().get(sessionId);

        for (TaskFuture taskFuture : taskFutures) {
            TaskInstanceDTO instance = taskFuture.getInstance();
            TaskDTO taskDTO = taskService.queryById(instance.getTaskId());
            Future<TaskRunnerResult> future = taskFuture.getFuture();
            if (null == taskDTO) {//运行过程中将节点删除了
                instance.setStatus(TaskInstanceStatus.FAIL.toString());
                instance.setLogInfo(String.format(Constant.errorTpl, "task is null"));
                taskInstanceService.update(instance);
            }
            //已经结束的任务不需要往下走了
            if (TaskInstanceStatus.SUCCESS.toString().equals(instance.getStatus()) ||
                    TaskInstanceStatus.FAIL.toString().equals(instance.getStatus()) ||
                    TaskInstanceStatus.KILLED.toString().equals(instance.getStatus())) {
                continue;
            }
            if (future != null) {
                if (future.isDone()) {
                    TaskRunnerResult runnerResult = null;
                    int status = -1;
                    try {
                        runnerResult = future.get();
                    } catch (Exception e) {
                        logger.error("DAGScheduler failed to get taskFuture result, since {}", e.getMessage());
                        logger.error("[ERROR] execution error happened. sessionId: {}, taskId: {}, taskInstanceId： {}",
                                sessionId, instance.getTaskId(), instance.getId());
                    }
                    if (null == runnerResult) {
                        runnerResult = TaskRunnerResult.fail();
                        logger.error("[ERROR] execution result is null. sessionId: {}, taskId: {}, taskInstanceId： {}",
                                sessionId, instance.getTaskId(), instance.getId());
                    }
                    status = runnerResult.getStatus();
                    logger.info("runnerResult -> {}", runnerResult);
                    if (status == 0) {
                        // 如果是elt操作,或者是自定义算子
                        instance.setStatus(TaskInstanceStatus.SUCCESS.toString());
                        this.setLastStatus(taskDTO, TaskInstanceStatus.SUCCESS.toString());
                        instance.setLogInfo(runnerResult.getOutput());
                        taskInstanceService.update(instance);
                        // 设置totalRow
                        JSONObject jsonObject = JSONObject.parseObject(taskDTO.getDataJson());
                        jsonObject.put("isSample", "SUCCESS");
                        taskDTO.setDataJson(JSONObject.toJSONString(jsonObject));
                        taskService.update(taskDTO);
                        if (taskDTO.isThisTypeNode(TaskTypeEnum.TASK_TYPE_ALGOPY)){
                            //TODO aiwork-py 没有更新instance, 但是更新了task的结果表 这里替他更新下
                            //这里把 instance 的setParams 同步给 taskDTO
                            ImputationUtil.syncParams(instance, taskDTO);
                        }
                        if (taskDTO.isThisTypeNode(TaskTypeEnum.TASK_TYPE_JLAB)){
                            JSONArray output = JSONObject.parseObject(instance.getDataJson())
                                    .getJSONObject("inputInfo").getJSONArray("output");
//                            JSONObject taskInputInfo = jsonObject.getJSONObject("inputInfo");
//                            taskInputInfo.remove("output");
                            jsonObject.remove("output");
                            jsonObject.put("output",JSONArray.parseArray(output.toJSONString()));
                            taskDTO.setDataJson(JSONObject.toJSONString(jsonObject));
                            taskService.update(taskDTO);
                        }
                        if (taskDTO.isThisTypeNode(TaskTypeEnum.TASK_TYPE_ALGO)){
                            JSONArray output = TaskInstanceDTOUtil.getOutputArray(instance);
                            if (null == output || ( output.size() == 0 && !runnerResult.getOutput().isEmpty())){
                                //在成功状态下， 只有loginfo 但是没有更新output 帮它更新
                                List<JSONObject> outputParams = runnerResult.getOutParams().toJavaList(JSONObject.class);
                                String outTableName = "";
                                String modelTableName = "";
                                if (taskDTO.isThisTypeAlgo(AlgEnum.KMEANS)) {
                                    for (JSONObject item : outputParams) {
                                        String s1 = item.getString("out_table_name");
                                        if (s1.startsWith("pipeline.solid_kmeans_")) {
                                            outTableName = s1;
                                        }
                                        if (s1.startsWith("pipeline.solid_model_table_")) {
                                            modelTableName = s1;
                                        }
                                    }

                                    JSONArray input = TaskInstanceDTOUtil.getInputArray(instance);
                                    List<String> inputCols = input.getJSONObject(0).getJSONArray("tableCols")
                                            .toJavaList(String.class);
                                    List<String> inputColumnTypes = input.getJSONObject(0).getJSONArray("columnTypes")
                                            .toJavaList(String.class);
                                    JSONObject inputSemantics = input.getJSONObject(0).getJSONObject("semantic");
                                    inputCols.add("cluster_id");
                                    inputColumnTypes.add("text");
                                    inputSemantics.put("cluster_id", null);

                                    JSONObject outputItem = new JSONObject();
                                    outputItem.put("tableName", outTableName);
                                    outputItem.put("tableCols", inputCols);
                                    outputItem.put("nodeName", AlgEnum.KMEANS.toString());
                                    outputItem.put("semantic", inputSemantics);
                                    outputItem.put("columnTypes", inputColumnTypes);
                                    outputItem.put("subType", 11);
                                    output = new JSONArray();
                                    output.add(outputItem);
                                    TaskInstanceDTOUtil.updateJsonWithSpecificKey(instance, OUTPUT_HEADER, output);
                                    TaskDTOUtil.updateJsonWithSpecificKey(taskDTO, OUTPUT_HEADER, output);

                                    String[] parts = outTableName.split("_");
                                    Long fakeLastTimeStamp = Long.parseLong(parts[parts.length - 1]);
                                    TaskInstanceDTOUtil.updateJsonWithSpecificKey(instance, OUTPUT_HEADER, output);
                                    TaskDTOUtil.updateJsonWithSpecificKey(taskDTO, OUTPUT_HEADER, output);

                                    TaskInstanceDTOUtil.updateJsonWithSpecificKey(instance, LAST_TIMESTAMP_HEADER, fakeLastTimeStamp);
                                    TaskDTOUtil.updateJsonWithSpecificKey(taskDTO, LAST_TIMESTAMP_HEADER, fakeLastTimeStamp);
                                }
                            }
                        }
                    } else {
                        instance.setStatus(TaskInstanceStatus.FAIL.toString());
                        this.setLastStatus(taskDTO, TaskInstanceStatus.FAIL.toString());
                        pipelineInstanceService.updateStatus(instance.getSessionId(), TaskInstanceStatus.FAIL.toString());
                        instance.setLogInfo(runnerResult.getOutput());
                        JSONObject jsonObject = JSONObject.parseObject(taskDTO.getDataJson());
                        jsonObject.put("isSample", "FAIL");
                        taskDTO.setDataJson(JSONObject.toJSONString(jsonObject));
                        taskService.update(taskDTO);
                    }
                    this.setLastTimeStamp(taskDTO, taskManager.getTimeHolder().get(sessionId));
                    taskService.update(taskDTO);
                    taskInstanceService.update(instance);
                    taskManager.updateDagTaskMap(sessionId, instance);
                    logger.warn("---------final-------------");
                    logger.warn(String.format("Instance(%s) %s, %s", instance.getId(), status, instance));
                    logger.warn("update lastStatus for taskId={} task {}", taskDTO.getId(), taskDTO);
                }
            } else {
                if (instance.getType().equals(TaskTypeEnum.TASK_TYPE_DATA.getVal())) {
                    continue;
                }
                if (isReady(instance)) {
                    Callable<TaskRunnerResult> runner = null;
                    JSONObject jsonObject = JSONObject.parseObject(taskDTO.getDataJson());

                    if (instance.getType().equals(TaskTypeEnum.TASK_TYPE_JLAB.getVal())){
                        //JupyterLab Node
                        runner = new JLabRunner(jlabService, instance);
                    }
                    else if (instance.getType().equals(TaskTypeEnum.TASK_TYPE_CLEAN.getVal())) {
                        // 清洗算子
                        runner = new DataCleanRunner(tColumnService, instance, taskManager);
                    } else if (jsonObject.containsKey("subType") &&
                            jsonObject.getInteger("subType").equals(SubTypeEnum.GRAPH_NET.getVal())) {
                        //算子节点如果是图构建，暂时走图构建的调度
                        runner = new GraphRunner(graphService, graphInstanceService, janusGraphEmbedService, instance);
                    } else if (instance.getType().equals(TaskTypeEnum.TASK_TYPE_ETL.getVal())) {
                        //如果是elt操作，走原来的gp调度
                        if (instance.getType().equals(TaskTypeEnum.TASK_TYPE_ETL.getVal())) {
                            int algType = taskDTO.view().getData().getIntValue("algType");
                            if (ETLEnum.JOIN.getVal() == algType || ETLEnum.UNION.getVal() == algType) {
                                // 延迟执行join算子initSql时候，datajson已经修改，因此jsonObject需要重新赋值
                                jsonObject = JSONObject.parseObject(taskDTO.getDataJson());
                                if (taskManager.getSamplingSessions().contains(sessionId) &&
                                        jsonObject.containsKey("isSample") &&
                                        jsonObject.getString("isSample").equals("CREATE")) {
                                    jsonObject.put("isSample", "RUNNING");
                                    taskDTO.setDataJson(JSONObject.toJSONString(jsonObject));
                                }
                                //同时从上游节点获取输出信息 更新节点
                                taskService.updateJoinUnionConfig(taskDTO);
                                taskDTO = taskService.queryById(taskDTO.getId());
                                // 支持autojoin延迟计算sql语句
                                String sql = taskService.initSql(taskDTO, taskManager.getSId(sessionId));
                                sql = this.modifySql(taskDTO, sql, taskManager.getSId(sessionId), null);
                                logger.warn("join | union init sql using datajson -> {}", taskDTO.getDataJson());
                                logger.warn("join | union updated sql -> {}", sql);
                                instance.setSqlText(sql);
                                taskInstanceService.update(instance);
                            }else if(ETLEnum.FILTER.getVal() == algType){
                                //跟JOIN UNION 一样 也是延迟加载，重新赋值 针对NAP-3594
                                String sql = FilterHelper.rebuildSql(instance.getSqlText(), instance);
                                instance.setSqlText(sql);
                                taskInstanceService.update(instance);
                            }
                            runner = new TaskRunner(gpDataProvider, instance);
                        }
                    } else if (instance.getType().equals(TaskTypeEnum.TASK_TYPE_ALGOPY.getVal())) {
                        TaskDTO parentNode = taskService.queryById(Long.parseLong(taskDTO.getParentId()));
                        TaskInstanceDTOUtil.syncInstanceInputByParentDTO(parentNode, instance);
                        ImputationUtil.initParams(instance, taskDTO);
                        runner = new FlaskSubmitRunner(restTemplateUtil, taskInstanceService, instance);
                    } else if (taskManager.getSamplingSessions().contains(sessionId) &&
                            jsonObject.containsKey("isSample") &&
                            jsonObject.getString("isSample").equals("CREATE")) {
                        // 算子采样过程 走原先的madlib
                        jsonObject.put("isSample", "RUNNING");
                        runner = new TaskRunner(gpDataProvider, instance);
                        taskDTO.setDataJson(JSONObject.toJSONString(jsonObject));
                    } else {
                        // 其它情况 根据配置选择引擎
                        if (engineSelector.isSpark()) {
                            runner = new SparkSubmitRunner(restTemplateUtil, instance, appIdInstanceMap, taskInstanceService);
                        } else if (engineSelector.isMadlib()) {
                            runner = new TaskRunner(gpDataProvider, instance);
                        }
                    }

                    if (!taskDTO.getType().equals(TaskTypeEnum.TASK_TYPE_CLEAN.getVal())) {
                        future = executor.submit(runner);
                        taskFuture.setFuture(future);
                    }

                    instance.setStatus(TaskInstanceStatus.RUNNING.toString());
                    taskInstanceService.update(instance);
                    taskManager.updateDagTaskMap(sessionId, instance);
                    logger.warn(String.format("Instance(%s) Submit, %s", instance.getId(), instance));

                    this.setLastStatus(taskDTO, TaskInstanceStatus.RUNNING.toString());
                    this.setLastTimeStamp(taskDTO, taskManager.getTimeHolder().get(sessionId));
                    taskService.update(taskDTO);

                    //清洗节点后提交，防止taskService.update线程冲突
                    if (taskDTO.getType().equals(TaskTypeEnum.TASK_TYPE_CLEAN.getVal())) {
                        future = executor.submit(runner);
                        taskFuture.setFuture(future);
                    }
                    logger.warn("task={} running", taskDTO.getId());
                } else if (taskManager.isAncestorFail(sessionId, instance)) {
                    logger.error("taskInstance={} ancestor has fail!!!", instance.getId());
                    // 祖先节点如果有失败，需要把当前节点也设置为失败
                    instance.setStatus(TaskInstanceStatus.FAIL.toString());
                    taskManager.updateDagTaskMap(sessionId, instance);
                    taskInstanceService.update(instance);
                    JSONObject jsonObject = JSONObject.parseObject(taskDTO.getDataJson());
                    if (taskManager.getSamplingSessions().contains(sessionId) && jsonObject.containsKey("isSample") &&
                            jsonObject.getString("isSample").equals("RUNNING")
                    ) {
                        jsonObject.put("isSample", "FAIL");
                        taskDTO.setDataJson(JSONObject.toJSONString(jsonObject));
                        taskService.update(taskDTO);
                    }
                }
            }

        }
    }

    private boolean isReady(TaskInstanceDTO instance) {
        if (null != instance) {
            String parentId = instance.getParentId();
            if (StringUtils.isEmpty(parentId)) {
                return true;
            }

            String[] spliter = StringUtils.split(parentId, ",");
            for (String singleParentId : spliter) {
                TaskInstanceDTO parentInstance = taskInstanceService
                        .queryById(Long.parseLong(singleParentId));
                if (!TaskInstanceStatus.SUCCESS.toString().equals(parentInstance.getStatus())) {
                    return false;
                }
            }
            return true;
        }
        return false;
    }

    /**
     * 更新算子运行最终状态
     *
     * @param jsonObject
     */
    public void updateJobStatus(JSONObject jsonObject) {
        String applicationId = jsonObject.getString("id");
        String finalStatus = jsonObject.getString("finalStatus");
        Float progress = jsonObject.getFloat("progress");
        String diagnostics = jsonObject.getString("diagnostics");
        TaskInstanceDTO instanceDTO = appIdInstanceMap.remove(applicationId);
        String status = "";
        //有时候 spark不稳定，实际上提交成功的任务 返回值却是UNDEFINED，所以加了一层判断
        if (JobStatus.KILLED.toString().equals(finalStatus)) {
            status = "KILLED";
        } else if (JobStatus.SUCCEEDED.toString().equals(finalStatus)) {
            status = "SUCCESS";
        } else {
            status = JobStatus.UNDEFINED.toString().equals(finalStatus) ? "SUCCESS" : "FAIL";
        }
        logger.info("received callback, applicationId={}, status={}", applicationId, status);
        if (null != instanceDTO) {
            instanceDTO.setProgress(100);
            if (!"SUCCESS".equals(status)) {
                instanceDTO.setLogInfo(
                        String.format(Constant.errorTpl, diagnostics.replaceAll("\"", "'")));
            }
            instanceDTO.setStatus(status);
            taskInstanceService.update(instanceDTO);
            instanceDTO.setLogInfo(taskInstanceService.queryById(instanceDTO.getId()).getLogInfo());
            logger.info("updateJobStatus success, taskInstanceId={}", instanceDTO.getId());
        }
    }

    /**
     * 停止running状态的task
     *
     * @param sessionId
     */
    public void stopPipeline(Long sessionId) {
        List<TaskFuture> taskFutures = taskManager.getTaskHolder().get(sessionId);
        if (CollectionUtil.isEmpty(taskFutures)) {
            return;
        }
        for (TaskFuture taskFuture : taskFutures) {
            TaskInstanceDTO instance = taskFuture.getInstance();
            if (TaskInstanceStatus.RUNNING.toString().equals(instance.getStatus())) {
                try {
                    taskInstanceService.updateStatus(instance, TaskInstanceStatus.FAIL, "");
                } catch (Exception e) {
                    throw new DataScienceException(String.format("操作失败，更新{}任务状态失败", instance.getTaskName()));
                }
                if (null == instance.getApplicationId() || instance.getApplicationId().isEmpty()) {
                    //先去杀， 没杀死或者没有applicationId的先缓存下来，等会杀
                    needStopTaskMap.put(instance.getId(), "");
                    retryStopTaskMap.putIfAbsent(instance.getId(), 0);
                    continue;
                }
                if (!restTemplateUtil.killJob(instance.getApplicationId())) {
                    //有了applicationId 但是杀失败了，也缓存下来
                    needStopTaskMap.put(instance.getId(), instance.getApplicationId());
                    retryStopTaskMap.putIfAbsent(instance.getId(), 0);
                }
            }
        }
    }

    /**
     * find the lowest common ancestor
     *
     * @param taskIds
     * @return
     */
    public Long findLCA(List<Long> taskIds) {
        if (taskIds.size() == 1){
            return taskIds.get(0);
        }
        Transpose transpose = new Transpose();
        for (Long taskId : taskIds) {
            List<Set<Long>> item = getAncestorsWithLevel(taskId);
            transpose.append(item);
        }

        Long resultId = 0L;
        for (Object obj : transpose) {
            List<Set<Long>> ids = (List<Set<Long>>) obj;
            Set<Long> tempSet = null;
            for (int i = 0; i < taskIds.size(); i++) {
                if (null == tempSet) {
                    tempSet = ids.get(i);
                } else {
                    tempSet.retainAll(ids.get(i));
                }
            }
            if (tempSet.size() != 0) {
                resultId = (Long) tempSet.toArray()[0];
            }
        }
        return resultId;
    }
}